1 /*
2 * Copyright (C) 2007 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.eventbus;
18
19 import static com.google.common.base.Preconditions.checkNotNull;
20
21 import com.google.common.annotations.Beta;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Throwables;
24 import com.google.common.cache.CacheBuilder;
25 import com.google.common.cache.CacheLoader;
26 import com.google.common.cache.LoadingCache;
27 import com.google.common.collect.HashMultimap;
28 import com.google.common.collect.Multimap;
29 import com.google.common.collect.SetMultimap;
30 import com.google.common.reflect.TypeToken;
31 import com.google.common.util.concurrent.UncheckedExecutionException;
32
33 import java.lang.reflect.InvocationTargetException;
34 import java.util.Collection;
35 import java.util.LinkedList;
36 import java.util.Map.Entry;
37 import java.util.Queue;
38 import java.util.Set;
39 import java.util.concurrent.locks.ReadWriteLock;
40 import java.util.concurrent.locks.ReentrantReadWriteLock;
41 import java.util.logging.Level;
42 import java.util.logging.Logger;
43
44 /**
45 * Dispatches events to listeners, and provides ways for listeners to register
46 * themselves.
47 *
48 * <p>The EventBus allows publish-subscribe-style communication between
49 * components without requiring the components to explicitly register with one
50 * another (and thus be aware of each other). It is designed exclusively to
51 * replace traditional Java in-process event distribution using explicit
52 * registration. It is <em>not</em> a general-purpose publish-subscribe system,
53 * nor is it intended for interprocess communication.
54 *
55 * <h2>Receiving Events</h2>
56 * <p>To receive events, an object should:
57 * <ol>
58 * <li>Expose a public method, known as the <i>event subscriber</i>, which accepts
59 * a single argument of the type of event desired;</li>
60 * <li>Mark it with a {@link Subscribe} annotation;</li>
61 * <li>Pass itself to an EventBus instance's {@link #register(Object)} method.
62 * </li>
63 * </ol>
64 *
65 * <h2>Posting Events</h2>
66 * <p>To post an event, simply provide the event object to the
67 * {@link #post(Object)} method. The EventBus instance will determine the type
68 * of event and route it to all registered listeners.
69 *
70 * <p>Events are routed based on their type — an event will be delivered
71 * to any subscriber for any type to which the event is <em>assignable.</em> This
72 * includes implemented interfaces, all superclasses, and all interfaces
73 * implemented by superclasses.
74 *
75 * <p>When {@code post} is called, all registered subscribers for an event are run
76 * in sequence, so subscribers should be reasonably quick. If an event may trigger
77 * an extended process (such as a database load), spawn a thread or queue it for
78 * later. (For a convenient way to do this, use an {@link AsyncEventBus}.)
79 *
80 * <h2>Subscriber Methods</h2>
81 * <p>Event subscriber methods must accept only one argument: the event.
82 *
83 * <p>Subscribers should not, in general, throw. If they do, the EventBus will
84 * catch and log the exception. This is rarely the right solution for error
85 * handling and should not be relied upon; it is intended solely to help find
86 * problems during development.
87 *
88 * <p>The EventBus guarantees that it will not call a subscriber method from
89 * multiple threads simultaneously, unless the method explicitly allows it by
90 * bearing the {@link AllowConcurrentEvents} annotation. If this annotation is
91 * not present, subscriber methods need not worry about being reentrant, unless
92 * also called from outside the EventBus.
93 *
94 * <h2>Dead Events</h2>
95 * <p>If an event is posted, but no registered subscribers can accept it, it is
96 * considered "dead." To give the system a second chance to handle dead events,
97 * they are wrapped in an instance of {@link DeadEvent} and reposted.
98 *
99 * <p>If a subscriber for a supertype of all events (such as Object) is registered,
100 * no event will ever be considered dead, and no DeadEvents will be generated.
101 * Accordingly, while DeadEvent extends {@link Object}, a subscriber registered to
102 * receive any Object will never receive a DeadEvent.
103 *
104 * <p>This class is safe for concurrent use.
105 *
106 * <p>See the Guava User Guide article on <a href=
107 * "http://code.google.com/p/guava-libraries/wiki/EventBusExplained">
108 * {@code EventBus}</a>.
109 *
110 * @author Cliff Biffle
111 * @since 10.0
112 */
113 @Beta
114 public class EventBus {
115
116 /**
117 * A thread-safe cache for flattenHierarchy(). The Class class is immutable. This cache is shared
118 * across all EventBus instances, which greatly improves performance if multiple such instances
119 * are created and objects of the same class are posted on all of them.
120 */
121 private static final LoadingCache<Class<?>, Set<Class<?>>> flattenHierarchyCache =
122 CacheBuilder.newBuilder()
123 .weakKeys()
124 .build(new CacheLoader<Class<?>, Set<Class<?>>>() {
125 @SuppressWarnings({"unchecked", "rawtypes"}) // safe cast
126 @Override
127 public Set<Class<?>> load(Class<?> concreteClass) {
128 return (Set) TypeToken.of(concreteClass).getTypes().rawTypes();
129 }
130 });
131
132 /**
133 * All registered event subscribers, indexed by event type.
134 *
135 * <p>This SetMultimap is NOT safe for concurrent use; all access should be
136 * made after acquiring a read or write lock via {@link #subscribersByTypeLock}.
137 */
138 private final SetMultimap<Class<?>, EventSubscriber> subscribersByType =
139 HashMultimap.create();
140 private final ReadWriteLock subscribersByTypeLock = new ReentrantReadWriteLock();
141
142 /**
143 * Strategy for finding subscriber methods in registered objects. Currently,
144 * only the {@link AnnotatedSubscriberFinder} is supported, but this is
145 * encapsulated for future expansion.
146 */
147 private final SubscriberFindingStrategy finder = new AnnotatedSubscriberFinder();
148
149 /** queues of events for the current thread to dispatch */
150 private final ThreadLocal<Queue<EventWithSubscriber>> eventsToDispatch =
151 new ThreadLocal<Queue<EventWithSubscriber>>() {
152 @Override protected Queue<EventWithSubscriber> initialValue() {
153 return new LinkedList<EventWithSubscriber>();
154 }
155 };
156
157 /** true if the current thread is currently dispatching an event */
158 private final ThreadLocal<Boolean> isDispatching =
159 new ThreadLocal<Boolean>() {
160 @Override protected Boolean initialValue() {
161 return false;
162 }
163 };
164
165 private SubscriberExceptionHandler subscriberExceptionHandler;
166
167 /**
168 * Creates a new EventBus named "default".
169 */
170 public EventBus() {
171 this("default");
172 }
173
174 /**
175 * Creates a new EventBus with the given {@code identifier}.
176 *
177 * @param identifier a brief name for this bus, for logging purposes. Should
178 * be a valid Java identifier.
179 */
180 public EventBus(String identifier) {
181 this(new LoggingSubscriberExceptionHandler(identifier));
182 }
183
184 /**
185 * Creates a new EventBus with the given {@link SubscriberExceptionHandler}.
186 *
187 * @param subscriberExceptionHandler Handler for subscriber exceptions.
188 * @since 16.0
189 */
190 public EventBus(SubscriberExceptionHandler subscriberExceptionHandler) {
191 this.subscriberExceptionHandler = checkNotNull(subscriberExceptionHandler);
192 }
193
194 /**
195 * Registers all subscriber methods on {@code object} to receive events.
196 * Subscriber methods are selected and classified using this EventBus's
197 * {@link SubscriberFindingStrategy}; the default strategy is the
198 * {@link AnnotatedSubscriberFinder}.
199 *
200 * @param object object whose subscriber methods should be registered.
201 */
202 public void register(Object object) {
203 Multimap<Class<?>, EventSubscriber> methodsInListener =
204 finder.findAllSubscribers(object);
205 subscribersByTypeLock.writeLock().lock();
206 try {
207 subscribersByType.putAll(methodsInListener);
208 } finally {
209 subscribersByTypeLock.writeLock().unlock();
210 }
211 }
212
213 /**
214 * Unregisters all subscriber methods on a registered {@code object}.
215 *
216 * @param object object whose subscriber methods should be unregistered.
217 * @throws IllegalArgumentException if the object was not previously registered.
218 */
219 public void unregister(Object object) {
220 Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object);
221 for (Entry<Class<?>, Collection<EventSubscriber>> entry :
222 methodsInListener.asMap().entrySet()) {
223 Class<?> eventType = entry.getKey();
224 Collection<EventSubscriber> eventMethodsInListener = entry.getValue();
225
226 subscribersByTypeLock.writeLock().lock();
227 try {
228 Set<EventSubscriber> currentSubscribers = subscribersByType.get(eventType);
229 if (!currentSubscribers.containsAll(eventMethodsInListener)) {
230 throw new IllegalArgumentException(
231 "missing event subscriber for an annotated method. Is " + object + " registered?");
232 }
233 currentSubscribers.removeAll(eventMethodsInListener);
234 } finally {
235 subscribersByTypeLock.writeLock().unlock();
236 }
237 }
238 }
239
240 /**
241 * Posts an event to all registered subscribers. This method will return
242 * successfully after the event has been posted to all subscribers, and
243 * regardless of any exceptions thrown by subscribers.
244 *
245 * <p>If no subscribers have been subscribed for {@code event}'s class, and
246 * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
247 * DeadEvent and reposted.
248 *
249 * @param event event to post.
250 */
251 public void post(Object event) {
252 Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
253
254 boolean dispatched = false;
255 for (Class<?> eventType : dispatchTypes) {
256 subscribersByTypeLock.readLock().lock();
257 try {
258 Set<EventSubscriber> wrappers = subscribersByType.get(eventType);
259
260 if (!wrappers.isEmpty()) {
261 dispatched = true;
262 for (EventSubscriber wrapper : wrappers) {
263 enqueueEvent(event, wrapper);
264 }
265 }
266 } finally {
267 subscribersByTypeLock.readLock().unlock();
268 }
269 }
270
271 if (!dispatched && !(event instanceof DeadEvent)) {
272 post(new DeadEvent(this, event));
273 }
274
275 dispatchQueuedEvents();
276 }
277
278 /**
279 * Queue the {@code event} for dispatch during
280 * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence
281 * so they can be dispatched in the same order.
282 */
283 void enqueueEvent(Object event, EventSubscriber subscriber) {
284 eventsToDispatch.get().offer(new EventWithSubscriber(event, subscriber));
285 }
286
287 /**
288 * Drain the queue of events to be dispatched. As the queue is being drained,
289 * new events may be posted to the end of the queue.
290 */
291 void dispatchQueuedEvents() {
292 // don't dispatch if we're already dispatching, that would allow reentrancy
293 // and out-of-order events. Instead, leave the events to be dispatched
294 // after the in-progress dispatch is complete.
295 if (isDispatching.get()) {
296 return;
297 }
298
299 isDispatching.set(true);
300 try {
301 Queue<EventWithSubscriber> events = eventsToDispatch.get();
302 EventWithSubscriber eventWithSubscriber;
303 while ((eventWithSubscriber = events.poll()) != null) {
304 dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
305 }
306 } finally {
307 isDispatching.remove();
308 eventsToDispatch.remove();
309 }
310 }
311
312 /**
313 * Dispatches {@code event} to the subscriber in {@code wrapper}. This method
314 * is an appropriate override point for subclasses that wish to make
315 * event delivery asynchronous.
316 *
317 * @param event event to dispatch.
318 * @param wrapper wrapper that will call the subscriber.
319 */
320 void dispatch(Object event, EventSubscriber wrapper) {
321 try {
322 wrapper.handleEvent(event);
323 } catch (InvocationTargetException e) {
324 try {
325 subscriberExceptionHandler.handleException(
326 e.getCause(),
327 new SubscriberExceptionContext(
328 this,
329 event,
330 wrapper.getSubscriber(),
331 wrapper.getMethod()));
332 } catch (Throwable t) {
333 // If the exception handler throws, log it. There isn't much else to do!
334 Logger.getLogger(EventBus.class.getName()).log(Level.SEVERE,
335 String.format(
336 "Exception %s thrown while handling exception: %s", t,
337 e.getCause()),
338 t);
339 }
340 }
341 }
342
343 /**
344 * Flattens a class's type hierarchy into a set of Class objects. The set
345 * will include all superclasses (transitively), and all interfaces
346 * implemented by these superclasses.
347 *
348 * @param concreteClass class whose type hierarchy will be retrieved.
349 * @return {@code clazz}'s complete type hierarchy, flattened and uniqued.
350 */
351 @VisibleForTesting
352 Set<Class<?>> flattenHierarchy(Class<?> concreteClass) {
353 try {
354 return flattenHierarchyCache.getUnchecked(concreteClass);
355 } catch (UncheckedExecutionException e) {
356 throw Throwables.propagate(e.getCause());
357 }
358 }
359
360 /**
361 * Simple logging handler for subscriber exceptions.
362 */
363 private static final class LoggingSubscriberExceptionHandler
364 implements SubscriberExceptionHandler {
365
366 /**
367 * Logger for event dispatch failures. Named by the fully-qualified name of
368 * this class, followed by the identifier provided at construction.
369 */
370 private final Logger logger;
371
372 /**
373 * @param identifier a brief name for this bus, for logging purposes. Should
374 * be a valid Java identifier.
375 */
376 public LoggingSubscriberExceptionHandler(String identifier) {
377 logger = Logger.getLogger(
378 EventBus.class.getName() + "." + checkNotNull(identifier));
379 }
380
381 @Override
382 public void handleException(Throwable exception,
383 SubscriberExceptionContext context) {
384 logger.log(Level.SEVERE, "Could not dispatch event: "
385 + context.getSubscriber() + " to " + context.getSubscriberMethod(),
386 exception.getCause());
387 }
388 }
389
390 /** simple struct representing an event and it's subscriber */
391 static class EventWithSubscriber {
392 final Object event;
393 final EventSubscriber subscriber;
394 public EventWithSubscriber(Object event, EventSubscriber subscriber) {
395 this.event = checkNotNull(event);
396 this.subscriber = checkNotNull(subscriber);
397 }
398 }
399 }